// Copyright 2018 Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package service provides prometheus service for user to query metrics.
package service

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"net/http"
	"strconv"
	"strings"
	"time"

	"github.com/prometheus/client_golang/api"
	"github.com/prometheus/client_golang/api/prometheus/v1"
	"github.com/prometheus/common/model"
	"github.com/xiaomi/naftis/src/api/bootstrap"
	"github.com/xiaomi/naftis/src/api/log"
)

// Prom wraps prom service for easily use.
var Prom *prom

type prom struct {
	promHandler
	api v1.API
}

const (
	localPromURL    = "http://localhost:9090"
	istioPromURLFmt = "http://prometheus.%s:9090"
)

// InitProm initializes prometheus service.
func InitProm() {
	addr := localPromURL
	if bootstrap.Args.InCluster {
		addr = fmt.Sprintf(istioPromURLFmt, bootstrap.Args.IstioNamespace)
	}
	client, err := api.NewClient(api.Config{Address: addr})
	if err != nil {
		log.Error("[Prom] init fail")
	}
	Prom = &prom{
		promHandler: promHandler{
			addr:   addr,
			static: &Static{Nodes: make(map[string]struct{})},
			writer: GenerateD3JSON,
		},
		api: v1.NewAPI(client),
	}
}

// Query queries prometheus Value with specific query string.
func (p *prom) Query(q string) (model.Value, error) {
	return p.api.Query(context.Background(), q, time.Now())
}

// Graph represents a service graph generated.
type Graph struct {
	Nodes map[string]struct{} `json:"nodes"`
	Edges []*Edge             `json:"edges"`
}

type (
	// Static represents a service graph generated by API calls that is
	// meant to persist across generation requests. It must be merged with
	// a Dynamic graph to provide a complete service graph for Istio.
	Static struct {
		Nodes map[string]struct{}
	}

	// Dynamic represents a service graph generated on the fly.
	Dynamic struct {
		Nodes map[string]struct{} `json:"nodes"`
		Edges []*Edge             `json:"edges"`
	}

	// Edge represents an edge in a dynamic service graph.
	Edge struct {
		Source string     `json:"source"`
		Target string     `json:"target"`
		Labels Attributes `json:"labels"`
	}

	// Attributes contain a set of annotations for an edge.
	Attributes map[string]string

	// SerializeFn provides a mechanism for writing out the service graph.
	SerializeFn func(w io.Writer, g *Dynamic) error
)

// AddEdge adds a new edge to an existing dynamic graph.
func (d *Dynamic) AddEdge(src, target string, lbls map[string]string) {
	d.Edges = append(d.Edges, &Edge{src, target, lbls})
	d.Nodes[src] = struct{}{}
	d.Nodes[target] = struct{}{}
}

// Merge adds all of the nodes in the static graph into the dynamic graph.
func (d *Dynamic) Merge(static *Static) {
	for node := range static.Nodes {
		d.Nodes[node] = struct{}{}
	}
}

type (
	// d3Graph represents a service d3 graph generated.
	d3Graph struct {
		Nodes []d3Node `json:"nodes"`
		Links []d3Link `json:"links"`
	}
	d3Node struct {
		Name string `json:"name"`
	}
	d3Link struct {
		Source int        `json:"source"`
		Target int        `json:"target"`
		Labels Attributes `json:"labels"`
	}
)

func indexOf(nodes []d3Node, name string) (int, error) {
	for i, v := range nodes {
		if v.Name == name {
			return i, nil
		}
	}
	return 0, errors.New("invalid graph")
}

// GenerateD3JSON converts the standard Dynamic graph to d3Graph, then
// serializes to JSON.
func GenerateD3JSON(w io.Writer, g *Dynamic) error {
	graph := d3Graph{
		Nodes: make([]d3Node, 0, len(g.Nodes)),
		Links: make([]d3Link, 0, len(g.Edges)),
	}
	for k := range g.Nodes {
		n := d3Node{
			Name: k,
		}
		graph.Nodes = append(graph.Nodes, n)
	}
	for _, v := range g.Edges {
		s, err := indexOf(graph.Nodes, v.Source)
		if err != nil {
			return err
		}
		t, err := indexOf(graph.Nodes, v.Target)
		if err != nil {
			return err
		}
		l := d3Link{
			Source: s,
			Target: t,
			Labels: v.Labels,
		}
		graph.Links = append(graph.Links, l)
	}
	return json.NewEncoder(w).Encode(graph)
}

const reqsFmt = "sum(rate(istio_requests_total{reporter=\"destination\"%s}[%s])) by (source_workload, destination_workload, source_app, destination_app)"
const tcpFmt = "sum(rate(istio_tcp_received_bytes_total{reporter=\"destination\"%s}[%s])) by (source_workload, destination_workload, source_app, destination_app)"
const emptyFilter = " > 0"

type genOpts struct {
	timeHorizon  string
	filterEmpty  bool
	dstNamespace string
	dstWorkload  string
	srcNamespace string
	srcWorkload  string
}

type promHandler struct {
	addr   string
	static *Static
	writer SerializeFn
}

// NewPromHandler returns a new http.Handler that will serve servicegraph data
// based on queries against a prometheus backend.
func NewPromHandler(addr string, static *Static, writer SerializeFn) http.Handler {
	return &promHandler{addr, static, writer}
}

// AddEdge adds a new edge to an existing dynamic graph.
func (g *Graph) AddEdge(src, target string, lbls map[string]string) {
	g.Edges = append(g.Edges, &Edge{src, target, lbls})
	g.Nodes[src] = struct{}{}
	g.Nodes[target] = struct{}{}
}

func (p *promHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	timeHorizon := r.URL.Query().Get("time_horizon")
	if timeHorizon == "" {
		timeHorizon = "5m"
	}
	filterEmpty := false
	filterEmptyStr := r.URL.Query().Get("filter_empty")
	if filterEmptyStr == "true" {
		filterEmpty = true
	}

	dstNamespace := r.URL.Query().Get("destination_namespace")
	dstWorkload := r.URL.Query().Get("destination_workload")
	srcNamespace := r.URL.Query().Get("source_namespace")
	srcWorkload := r.URL.Query().Get("source_workload")
	// validate time_horizon
	if _, err := model.ParseDuration(timeHorizon); err != nil {
		writeError(w, fmt.Errorf("could not parse time_horizon: %v", err))
		return
	}
	g, err := p.generate(genOpts{
		timeHorizon,
		filterEmpty,
		dstNamespace,
		dstWorkload,
		srcNamespace,
		srcWorkload,
	})
	g.Merge(p.static)
	if err != nil {
		writeError(w, err)
		return
	}
	err = p.writer(w, g)
	if err != nil {
		writeError(w, err)
		return
	}
}

func writeError(w http.ResponseWriter, err error) {
	w.WriteHeader(http.StatusInternalServerError)
	_, writeErr := w.Write([]byte(err.Error()))
	log.Error("[Graph] writeError", "err", writeErr, "orgerr", err)
}

func (p *promHandler) generate(opts genOpts) (*Dynamic, error) {
	client, err := api.NewClient(api.Config{Address: p.addr})
	if err != nil {
		return nil, err
	}
	api := v1.NewAPI(client)

	serviceFilter := generateServiceFilter(opts)
	query := fmt.Sprintf(reqsFmt, serviceFilter, opts.timeHorizon)
	if opts.filterEmpty {
		query += emptyFilter
	}

	graph, err := extractGraph(api, query, "reqs/sec")
	if err != nil {
		return nil, err
	}
	query = fmt.Sprintf(tcpFmt, serviceFilter, opts.timeHorizon)

	if opts.filterEmpty {
		query += emptyFilter
	}
	tcpGraph, err := extractGraph(api, query, "bytes/sec")
	if err != nil {
		return nil, err
	}
	return merge(graph, tcpGraph)
}

func generateServiceFilter(opts genOpts) string {
	filterParams := make([]string, 0, 4)
	if opts.dstNamespace != "" {
		filterParams = append(filterParams, "destination_namespace=\""+opts.dstNamespace+"\"")
	}
	if opts.dstWorkload != "" {
		filterParams = append(filterParams, "destination_workload=\""+opts.dstWorkload+"\"")
	}
	if opts.srcNamespace != "" {
		filterParams = append(filterParams, "source_workload_namespace=\""+opts.srcNamespace+"\"")
	}
	if opts.srcWorkload != "" {
		filterParams = append(filterParams, "source_workload=\""+opts.srcWorkload+"\"")
	}
	filterStr := strings.Join(filterParams, ", ")
	if filterStr != "" {
		filterStr = ", " + filterStr
	}

	return filterStr
}

func merge(g1, g2 *Dynamic) (*Dynamic, error) {
	d := Dynamic{Nodes: map[string]struct{}{}, Edges: []*Edge{}}
	d.Edges = append(d.Edges, g1.Edges...)
	d.Edges = append(d.Edges, g2.Edges...)
	for nodeName, nodeValue := range g1.Nodes {
		d.Nodes[nodeName] = nodeValue
	}
	for nodeName, nodeValue := range g2.Nodes {
		d.Nodes[nodeName] = nodeValue
	}
	return &d, nil
}

func extractGraph(api v1.API, query, label string) (*Dynamic, error) {
	val, err := api.Query(context.Background(), query, time.Now())
	if err != nil {
		return nil, err
	}
	switch val.Type() {
	case model.ValVector:
		matrix := val.(model.Vector)
		d := Dynamic{Nodes: map[string]struct{}{}, Edges: []*Edge{}}
		for _, sample := range matrix {
			// todo: add error checking here
			metric := sample.Metric
			srcWorkload := string(metric["source_workload"])
			src := string(metric["source_app"])
			dstWorkload := string(metric["destination_workload"])
			dst := string(metric["destination_app"])

			value := sample.Value
			d.AddEdge(
				src+" ("+srcWorkload+")",
				dst+" ("+dstWorkload+")",
				Attributes{
					label: strconv.FormatFloat(float64(value), 'f', 6, 64),
				})
		}
		return &d, nil
	default:
		return nil, fmt.Errorf("unknown value type returned from query: %#v", val)
	}
}
